package com.gzc.mrg;

import com.gzc.codec.BagInfoCodec;
import com.gzc.codec.GoodsCodec;
import com.gzc.codec.PlayerCodec;
import com.gzc.enums.CollectionNameEnum;
import com.gzc.enums.DocumentKeyNameEnum;
import com.gzc.enums.MongoDBType;
import com.gzc.pojo.BagInfo;
import com.gzc.pojo.Player;
import com.mongodb.*;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import org.bson.BsonBinaryReader;
import org.bson.BsonBinaryWriter;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.bson.io.BasicOutputBuffer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.*;

/**
 * MongoDB操作基类
 *
 * @author Spirit_wolf
 * @date 2018/10/12.
 */
public abstract class BaseMongoMrg {

    private CodecRegistry codecRegistry;

    protected final EnumMap<MongoDBType, MongoDatabase> dbMap = new EnumMap<>(MongoDBType.class);

    private static final int FIND_CURSOR_BATCH_SIZE = 500;

    /**
     * 创建MongoDB驱动客户端，连接到单一节点
     * @param host
     * @param port
     * @param usr
     * @param password
     * @param dataBase
     * @return
     */
    protected MongoClient createMongoClient(String host, int port, String usr, String password, String dataBase) {
        ServerAddress serverAddress = new ServerAddress(host, port);
        MongoClientOptions mongoClientOptions = constructMongoClientOptions();

        MongoCredential credential = null;
        if ("".equals(usr) || "".equals(password)) {

        } else {
            credential = MongoCredential.createCredential(usr, dataBase, password.toCharArray());
        }

        MongoClient mongoClient = null;
        if (null == credential) {
            mongoClient = new MongoClient(serverAddress, mongoClientOptions);
        } else {
            mongoClient = new MongoClient(serverAddress, credential, mongoClientOptions);
        }

        return mongoClient;
    }

    /**
     * 创建MongoDB驱动客户端,连接到副本集
     *
     * @param addressList 多个服务器的地址列表
     * @param usr
     * @param password
     * @param dataBase
     * @return
     */
    protected MongoClient createMongoClient(List<ServerAddress> addressList, String usr, String password, String dataBase) {
        assert null != addressList && addressList.size() > 0;

        MongoClientOptions mongoClientOptions = constructMongoClientOptions();

        MongoCredential credential = null;
        if ("".equals(usr) || "".equals(password)) {

        } else {
            credential = MongoCredential.createCredential(usr, dataBase, password.toCharArray());
        }

        MongoClient mongoClient = null;
        if (null == credential) {
            mongoClient = new MongoClient(addressList, mongoClientOptions);
        } else {
            mongoClient = new MongoClient(addressList, credential, mongoClientOptions);
        }

        return mongoClient;
    }

    //TODO 连接mongos，操作分片

    /**
     * 构造MongoClientOptions对象
     *
     * @return
     */
    private MongoClientOptions constructMongoClientOptions() {
        final CodecRegistry fromCodecs = CodecRegistries.fromCodecs(new GoodsCodec());
        final CodecProvider codecProvider = new CodecProvider() {
            @Override
            public <T> Codec<T> get(Class<T> clazz, CodecRegistry registry) {
                if (clazz == Player.class) {
                    return (Codec<T>) new PlayerCodec(registry);
                } else if (clazz == BagInfo.class) {
                    return (Codec<T>) new BagInfoCodec(registry);
                } else {
                    return null;
                }
            }
        };

        final CodecRegistry codecRegistry = CodecRegistries.fromRegistries(MongoClient.getDefaultCodecRegistry(),
                fromCodecs,
                CodecRegistries.fromProviders(codecProvider));
        this.setCodecRegistry(codecRegistry);

        final MongoClientOptions mongoClientOptions = MongoClientOptions.builder().
                connectionsPerHost(2).
                codecRegistry(getCodecRegistry()).
                build();

        return mongoClientOptions;
    }

    protected MongoDatabase getDataBase(MongoDBType mongoDBType) {
        return dbMap.get(mongoDBType);
    }

    /**
     * 获取一个集合
     *
     * @param mongoDatabase
     * @param collectionName
     * @return
     */
    protected MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, CollectionNameEnum collectionName) {
        return getCollection(mongoDatabase, collectionName, Document.class);
    }

    protected MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, String collectionName) {
        return getCollection(mongoDatabase, collectionName, Document.class);
    }

    protected <T> MongoCollection<T> getCollection(MongoDatabase mongoDatabase, CollectionNameEnum collectionName,
                                                   Class<T> clazz) {
        return mongoDatabase.getCollection(collectionName.name(), clazz);
    }

    protected <T> MongoCollection<T> getCollection(MongoDatabase mongoDatabase, String collectionName,
                                                   Class<T> clazz) {
        return mongoDatabase.getCollection(collectionName, clazz);
    }

    /**
     * 获取一个集合
     *
     * @param mongoDBType
     * @param collectionNameEnum
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> MongoCollection<T> getCollection(MongoDBType mongoDBType, CollectionNameEnum collectionNameEnum,
                                                Class<T> clazz) {
        return getDataBase(mongoDBType).getCollection(collectionNameEnum.name(), clazz);
    }

    /**
     * 新建索引
     *
     * @param mongoDBType
     * @param collectionNameEnum
     * @param key
     */
    public void createIndex(MongoDBType mongoDBType, CollectionNameEnum collectionNameEnum, String key) {
        MongoCollection<Document> collection = getCollection(getDataBase(mongoDBType), collectionNameEnum);
        List<Document> indexList = new ArrayList<>();

        for (Document index : indexList) {
            Document value = (Document) index.get("key");
            if (value.get(key) != null) {
                return;
            }
        }
        //说明是个新索引
        collection.createIndex(new Document(key, 1));
    }

    /**
     * 插入一个文档
     *
     * @param mongoDatabase
     * @param collectionName
     * @param object
     * @param <T>
     */
    public <T> void insertDocument(MongoDatabase mongoDatabase, CollectionNameEnum collectionName, T object) {
        MongoCollection<T> collection = getCollection(mongoDatabase, collectionName, (Class<T>) object.getClass());
        collection.insertOne(object);
    }

    public <T> void insertDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName, T object) {
        insertDocument(getDataBase(mongoDBType), collectionName, object);
    }

    /**
     * 查询匹配的文档列表
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> List<T> findDocument(MongoDatabase mongoDatabase, String collectionName, Bson filterBson,
                                    Class<T> clazz) {
        List<T> result = new ArrayList<>();
        MongoCollection<T> collection = this.getCollection(mongoDatabase, collectionName, clazz);

        collection.find(filterBson).
                into(result);

        return result;
    }

    public <T> List<T> findDocuments(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson,
                                     Class<T> clazz) {
        return findDocument(getDataBase(mongoDBType), collectionName.name(), filterBson, clazz);
    }

    public <T> List<T> findDocuments(MongoDBType mongoDBType, String collectionName, Bson filterBson, Class<T> clazz) {
        return findDocument(getDataBase(mongoDBType), collectionName, filterBson, clazz);
    }

    /**
     * 查询第一个匹配的文档
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T findFirstDocument(MongoDatabase mongoDatabase, CollectionNameEnum collectionName, Bson filterBson,
                                   Class<T> clazz) {
        MongoCollection<T> collection = this.getCollection(mongoDatabase, collectionName, clazz);
        return collection.find(filterBson).
                first();
    }

    public <T> T findFirstDocument(MongoDBType mongoDBType, String collectionName, Bson filterBson, Class<T> clazz) {
        return findFirstDocument(getDataBase(mongoDBType), collectionName, filterBson, clazz);
    }

    public <T> T findFirstDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson, Class<T> clazz) {
        return findFirstDocument(getDataBase(mongoDBType), collectionName, filterBson, clazz);
    }

    public <T> T findFirstDocument(MongoDatabase mongoDatabase, String collectionName, Bson filterBson, Class<T> clazz) {
        MongoCollection<T> collection = this.getCollection(mongoDatabase, collectionName, clazz);

        return collection.find(filterBson).
                first();
    }

    /**
     * 查询第一个匹配的文档，只返回该文档的部分字段
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     * @param resultKeyNameList
     * @return
     */
    protected Document findFirstDocument(MongoDatabase mongoDatabase, String collectionName, Bson filterBson,
                                         List<String> resultKeyNameList) {
        return getCollection(mongoDatabase, collectionName).
                find(filterBson).
                projection(
                        Projections.include(resultKeyNameList)).
                first();
    }

    public List<Document> findDocumentsWithoutObjectId(MongoDBType mongoDBType, String collectionName, Bson filterBson,

                                                       List<String> keyList) {
        List<Document> rt = new LinkedList<>();

        return getCollection(getDataBase(mongoDBType),
                collectionName).
                find(filterBson).
                projection(Filters.and(Projections.include(keyList),
                        Projections.exclude(DocumentKeyNameEnum._id.name()))).
                into(rt);
    }

    /**
     * 查询第一个匹配的文档，返回部分字段
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param resultKeyNameList 要查询的字段名字集合
     * @return
     */
    public Document findFirstDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson,
                                      List<String> resultKeyNameList) {
        return findFirstDocument(getDataBase(mongoDBType), collectionName.name(), filterBson, resultKeyNameList);
    }

    /**
     * 查询第一个匹配的文档，返回部分字段
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param resultKeyNameList 要查询的字段名字集合
     * @return
     */
    public Document findFirstDocument(MongoDBType mongoDBType, String collectionName, Bson filterBson,
                                      List<String> resultKeyNameList) {
        return findFirstDocument(getDataBase(mongoDBType), collectionName, filterBson, resultKeyNameList);
    }

    protected <T> UpdateResult replaceDocument(MongoDatabase mongoDatabase, CollectionNameEnum collectionName,
                                               Bson filterBson, T object) {
        MongoCollection collection = this.getCollection(mongoDatabase, collectionName, (Class<T>) object.getClass());
        return collection.replaceOne(filterBson, object);
    }

    /**
     * 替换文档
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param object
     * @param <T>
     * @return
     */
    public <T> UpdateResult replaceDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson,
                                            T object) {
        return replaceDocument(getDataBase(mongoDBType), collectionName, filterBson, object);
    }

    /**
     * 根据匹配条件替换一个文档，若目标文档不存在，则插入该文档
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param object
     * @param <T>
     * @return
     */
    public <T> UpdateResult replaceDocumentWithInsertIfAbsent(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson, T object) {
        MongoCollection<T> collection = this.getCollection(
                getDataBase(mongoDBType),
                collectionName, (Class<T>) object.getClass());

        return collection.replaceOne(filterBson, object,
                new ReplaceOptions().
                        upsert(true));
    }

    /**
     * 查询对应分页的数据。由于用的skip所以不可查太多页，否则性能很差
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     * @param clazz
     * @param pageNum        从1开始
     * @param sizePerPage
     * @param <T>
     * @return
     */
    protected <T> List<T> findDocumentsByPage(MongoDatabase mongoDatabase, CollectionNameEnum collectionName,
                                              Bson filterBson, Class<T> clazz, int pageNum, int sizePerPage) {
        List<T> result = new ArrayList<>();
        final int skipNum = (pageNum - 1) * sizePerPage;

        MongoCollection<T> collection = this.getCollection(mongoDatabase, collectionName, clazz);
        collection.find(filterBson).
                skip(skipNum).
                limit(sizePerPage).
                into(result);

        return result;
    }

    /**
     * 查询对应分页的数据
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param clazz
     * @param pageNum        从1开始
     * @param sizePerPage
     * @param <T>
     * @return
     */
    public <T> List<T> findDocumentByPage(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson,
                                          Class<T> clazz, int pageNum, int sizePerPage) {
        return findDocumentsByPage(
                getDataBase(mongoDBType),
                collectionName, filterBson, clazz, pageNum, sizePerPage);
    }

    /**
     * 查询有限个数的文档对象列表
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     * @param clazz
     * @param limitNum
     * @param <T>
     * @return
     */
    protected <T> List<T> findDocumentsWithLimit(MongoDatabase mongoDatabase, CollectionNameEnum collectionName,
                                                 Bson filterBson, Class<T> clazz, int limitNum) {
        List<T> result = new ArrayList<>();

        MongoCollection<T> collection = this.getCollection(mongoDatabase, collectionName, clazz);
        collection.find(filterBson).
                limit(limitNum).
                into(result);

        return result;
    }

    /**
     * 查询有限个数的文档对象列表
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param clazz
     * @param limitNum
     * @param <T>
     * @return
     */
    public <T> List<T> findDocumentsWithLimit(MongoDBType mongoDBType, CollectionNameEnum collectionName,
                                              Bson filterBson, Class<T> clazz, int limitNum) {
        return findDocumentsWithLimit(
                getDataBase(mongoDBType),
                collectionName, filterBson, clazz, limitNum);
    }

    /**
     * 集合的文档数量
     *
     * @param mongoDBType
     * @param collectionName
     * @return
     */
    public long countCollection(MongoDBType mongoDBType, CollectionNameEnum collectionName) {
        return countCollection(mongoDBType, collectionName, new Document());
    }

    /**
     * 集合的文档数量
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filter
     * @return
     */
    protected long countCollection(MongoDatabase mongoDatabase, CollectionNameEnum collectionName, Bson filter) {
        return getCollection(mongoDatabase, collectionName).countDocuments(filter);
    }

    /**
     * 集合的文档数量
     *
     * @param mongoDBType
     * @param collectionName
     * @param filter
     * @return
     */
    public long countCollection(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filter) {
        return getCollection(getDataBase(mongoDBType), collectionName).countDocuments(filter);
    }

    /**
     * 删除一个匹配的文档
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     */
    protected void deleteOneDocument(MongoDatabase mongoDatabase, CollectionNameEnum collectionName, Bson filterBson) {
        MongoCollection<Document> collection = getCollection(mongoDatabase, collectionName);
        collection.deleteOne(filterBson);
    }

    /**
     * 删除一个匹配的文档
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     */
    public void deleteOneDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson) {
        deleteOneDocument(getDataBase(mongoDBType), collectionName, filterBson);
    }

    /**
     * 删除多个匹配的文档
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     */
    protected void deleteManyDocument(MongoDatabase mongoDatabase, CollectionNameEnum collectionName, Bson filterBson) {
        MongoCollection<Document> collection = getCollection(mongoDatabase, collectionName);
        collection.deleteMany(filterBson);
    }

    /**
     * 删除多个匹配的文档
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     */
    public void deleteManyDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson) {
        deleteManyDocument(getDataBase(mongoDBType), collectionName, filterBson);
    }

    /**
     * 更新一个文档
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     * @param updateBson
     */
    protected void updateOneDocument(MongoDatabase mongoDatabase, CollectionNameEnum collectionName,
                                     Bson filterBson, Bson updateBson) {
        MongoCollection<BsonDocument> collection = this.getCollection(mongoDatabase, collectionName, BsonDocument.class);
        collection.updateOne(filterBson, updateBson);
    }

    /**
     * 更新一个文档
     *
     * @param mongoDatabase
     * @param collectionName
     * @param filterBson
     * @param updateBson
     */
    protected void updateOneDocument(MongoDatabase mongoDatabase, String collectionName,
                                     Bson filterBson, Bson updateBson) {
        MongoCollection<BsonDocument> collection = this.getCollection(mongoDatabase, collectionName, BsonDocument.class);
        collection.updateOne(filterBson, updateBson);
    }

    protected void updateOneDocument(MongoDatabase mongoDatabase, String collectionName, Bson filterBson, Bson updateBson,
                                     UpdateOptions updateOptions) {
        MongoCollection<BsonDocument> collection = this.getCollection(mongoDatabase, collectionName, BsonDocument.class);
        collection.updateOne(filterBson, updateBson, updateOptions);
    }

    /**
     * 更新一个文档
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param updateBson
     */
    public void updateOneDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName,
                                  Bson filterBson, Bson updateBson) {
        updateOneDocument(getDataBase(mongoDBType), collectionName, filterBson, updateBson);
    }

    /**
     * 更新一个文档
     *
     * @param mongoDBType
     * @param collName
     * @param filterBson
     * @param updateBson
     */
    public void updateOneDocument(MongoDBType mongoDBType, String collName, Bson filterBson, Bson updateBson) {
        updateOneDocument(getDataBase(mongoDBType), collName, filterBson, updateBson);
    }

    protected void updateManyDocument(MongoDatabase mongoDatabase, CollectionNameEnum collectionName, Bson filterBson,
                                      Bson updateBson) {
        MongoCollection<BsonDocument> collection = this.getCollection(mongoDatabase, collectionName, BsonDocument.class);
        collection.updateMany(filterBson, updateBson);
    }

    /**
     * 更新多个文档
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param updateBson
     */
    public void updateManyDocument(MongoDBType mongoDBType, CollectionNameEnum collectionName,
                                   Bson filterBson, Bson updateBson) {
        updateManyDocument(this.getDataBase(mongoDBType), collectionName, filterBson, updateBson);
    }

    /**
     * 如果没有符合更新条件的文档，就以更新条件和更新文档为基础创建一个新文档
     *
     * @param mongoDBType
     * @param collectionName
     * @param filterBson
     * @param updateBson
     */
    public void upsertOneDocument(MongoDBType mongoDBType, String collectionName, Bson filterBson, Bson updateBson) {
        updateOneDocument(this.getDataBase(mongoDBType), collectionName, filterBson, updateBson, new UpdateOptions().upsert(true));
    }

    /**
     * 删除集合
     *
     * @param mongoDBType
     * @param collectionName
     */
    public void dropCollection(MongoDBType mongoDBType, CollectionNameEnum collectionName) {
        getCollection(getDataBase(mongoDBType), collectionName).drop();
    }

    /**
     * 删除集合
     *
     * @param mongoDBType
     * @param collectionName
     */
    public void dropCollection(MongoDBType mongoDBType, String collectionName) {
        getCollection(getDataBase(mongoDBType), collectionName).drop();
    }

    public Document findMaxOrMin(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filter) {
        MongoCollection<Document> s = getCollection(getDataBase(mongoDBType), collectionName);
        List<Document> rt = new ArrayList<>();
        s.find(new Document()).
                sort(filter).
                limit(1).
                into(rt).
                get(0);

        return rt.get(0);
    }

    public <T> Iterator<T> findCursor(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson,
                                      Class<T> clazz) {
        return getCollection(getDataBase(mongoDBType), collectionName, clazz).
                find(filterBson).
                batchSize(FIND_CURSOR_BATCH_SIZE).
                iterator();
    }

    public <T> Iterator<T> findCursor(MongoDBType mongoDBType, CollectionNameEnum collectionName, Bson filterBson,
                                      List<String> resultKeyList, Class<T> clazz) {
        return getCollection(getDataBase(mongoDBType), collectionName, clazz).
                find(filterBson).
                projection(Projections.include(resultKeyList)).
                batchSize(FIND_CURSOR_BATCH_SIZE).
                iterator();
    }

    public <T> Iterator<T> findCursor(MongoDatabase mongoDatabase, CollectionNameEnum collectionName, Bson filterBson,
                                      Class<T> clazz) {
        return getCollection(mongoDatabase, collectionName, clazz).
                find(filterBson).
                batchSize(FIND_CURSOR_BATCH_SIZE).
                iterator();
    }

    public boolean checkExist(MongoDBType mongoDBType, CollectionNameEnum collectionName, String documentKeyName,
                              Object value) {
        Document result = findFirstDocument(mongoDBType, collectionName,
                Filters.eq(documentKeyName, value),
                Document.class);

        if (null != result) {
            return true;
        }
        return false;
    }

    /**
     * 重命名集合的名字
     *
     * @param mongoDBType
     * @param collectionName
     * @param newName
     * @return
     */
    public boolean renameCollection(MongoDBType mongoDBType, String collectionName, String newName) {
        MongoDatabase mongoDatabase = getDataBase(mongoDBType);
        getCollection(mongoDatabase, collectionName).
                renameCollection(
                        new MongoNamespace(mongoDatabase.getName() + "." + newName));

        return true;
    }

    /**
     * 创建索引
     */
    public abstract void createNeedIndex();

    public CodecRegistry getCodecRegistry() {
        return codecRegistry;
    }

    public void setCodecRegistry(CodecRegistry codecRegistry) {
        this.codecRegistry = codecRegistry;
    }

    /**
     * 使用对应的Codec序列化数据bean对象
     *
     * @param bean 数据bean对象
     * @param <T>  任意数据bean对象类型
     * @return 字节数组
     */
    public <T> byte[] serializable(T bean) {
        BasicOutputBuffer outputBuffer = new BasicOutputBuffer();

        try (BsonBinaryWriter writer = new BsonBinaryWriter(outputBuffer)) {
            Codec codec = codecRegistry.get(bean.getClass());
            codec.encode(writer, bean, null);
            return outputBuffer.toByteArray();
        }
    }

    /**
     * 使用对应的Codec反序列化字节数组还原为数据bean对象
     *
     * @param clazz
     * @param input
     * @param <T>
     * @return 数据bean对象
     * @throws IOException
     */
    public <T> T deserializable(Class<T> clazz, InputStream input) throws IOException {
        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
            byte[] buffer = new byte[1024];
            int len = -1;
            while ((len = input.read(buffer)) > 0) {
                outputStream.write(buffer, 0, len);
            }

            BsonBinaryReader bsonBinaryReader = new BsonBinaryReader(ByteBuffer.wrap(outputStream.toByteArray()));
            Codec<T> codec = codecRegistry.get(clazz);
            return codec.decode(bsonBinaryReader, null);
        } finally {
            input.close();
        }
    }

}
